Join two Spark mllib pipelines together
Asked Answered
L

1

7

I have two separate DataFrames which each have several differing processing stages which I use mllib transformers in a pipeline to handle.

I now want to join these two pipelines together, keeping the features (columns) from each DataFrame.

Scikit-learn has the FeatureUnion class for handling this, and I can't seem to find an equivalent for mllib.

I can add a custom transformer stage at the end of one pipeline that takes the DataFrame produced by the other pipeline as an attribute and join it in the transform method, but that seems messy.

Lind answered 15/6, 2017 at 14:27 Comment(2)
is it a join or a union you are looking for? both can be handled using dataframes.Felixfeliza
@Felixfeliza it was a join, however, I wanted to do it as a pipeline stage so that I would have schema checking along my entire pipelineLind
E
10

Pipeline or PipelineModel are valid PipelineStages, and as such can be combined in a single Pipeline. For example with:

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

df = spark.createDataFrame([
    (1.0, 0, 1, 1, 0),
    (0.0, 1, 0, 0, 1)
], ("label", "x1", "x2", "x3", "x4"))

pipeline1 = Pipeline(stages=[
    VectorAssembler(inputCols=["x1", "x2"], outputCol="features1")
])

pipeline2 = Pipeline(stages=[
    VectorAssembler(inputCols=["x3", "x4"], outputCol="features2")
])

you can combine Pipelines:

Pipeline(stages=[
    pipeline1, pipeline2, 
    VectorAssembler(inputCols=["features1", "features2"], outputCol="features")
]).fit(df).transform(df)
+-----+---+---+---+---+---------+---------+-----------------+
|label|x1 |x2 |x3 |x4 |features1|features2|features         |
+-----+---+---+---+---+---------+---------+-----------------+
|1.0  |0  |1  |1  |0  |[0.0,1.0]|[1.0,0.0]|[0.0,1.0,1.0,0.0]|
|0.0  |1  |0  |0  |1  |[1.0,0.0]|[0.0,1.0]|[1.0,0.0,0.0,1.0]|
+-----+---+---+---+---+---------+---------+-----------------+

or pre-fitted PipelineModels:

model1 = pipeline1.fit(df)
model2 = pipeline2.fit(df)

Pipeline(stages=[
    model1, model2, 
    VectorAssembler(inputCols=["features1", "features2"], outputCol="features")
]).fit(df).transform(df)
+-----+---+---+---+---+---------+---------+-----------------+
|label| x1| x2| x3| x4|features1|features2|         features|
+-----+---+---+---+---+---------+---------+-----------------+
|  1.0|  0|  1|  1|  0|[0.0,1.0]|[1.0,0.0]|[0.0,1.0,1.0,0.0]|
|  0.0|  1|  0|  0|  1|[1.0,0.0]|[0.0,1.0]|[1.0,0.0,0.0,1.0]|
+-----+---+---+---+---+---------+---------+-----------------+

So the approach I would recommend is to join data beforehand, and fit and transform a whole DataFrame.

See also:

Exact answered 15/6, 2017 at 17:4 Comment(2)
Here we have single dataframe, but let say if we have two different dataframes with different number of rows, how spark ml pipeline join these internally? Is it a cross join or some other?Exodontist
Is there any performance difference between using 2 pipelines separately vs using one single pipeline?Zacharia

© 2022 - 2024 — McMap. All rights reserved.